package com.cloudansys.core.flink.sink;

import cn.hutool.core.date.DateUtil;
import com.cloudansys.config.DefaultConfig;
import com.cloudansys.core.constant.Const;
import com.cloudansys.core.entity.MultiDataEntity;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.influxdb.InfluxDB;
import org.influxdb.dto.Point;

import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class RawInfluxSink extends RichSinkFunction<List<MultiDataEntity>> {

    private InfluxDB influxDB;

    /**
     * 处理前初始化配置信息
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        influxDB = DefaultConfig.getInfluxDB();
    }

    /**
     * 处理流元素
     */
    @Override
    public void invoke(List<MultiDataEntity> element, Context context) {
        try {
            if (element.size() != 0) {
                for (MultiDataEntity multiDataEntity : element) {
                    Point point = getPoint(multiDataEntity);
                    if (null != point) {
                        influxDB.write(point);
                    }
                }
                log.info("【raw】save to influx success: {}", element.get(0).getTypeTag());
            }
        } catch (Exception e) {
            log.error("【raw】save to influx failed");
            e.printStackTrace();
        }
    }

    /**
     * 构建 influxDB 中的一条记录 point
     * 注：field 只存储该类型指标的 display 为 1 的指标
     */
    private Point getPoint(MultiDataEntity multiDataEntity) {
        try {
            String projectId = multiDataEntity.getProjectId();
            String serialCode = multiDataEntity.getSerialCode();
            String targetType = multiDataEntity.getTypeTag().toLowerCase();
            String pickTime = multiDataEntity.getPickTime();
            Double[] values = multiDataEntity.getValues();
            long timestamp = DateUtil.parse(pickTime, Const.FMT_TRIM_MILLI).getTime();
            Point.Builder pointBuilder = Point
                    .measurement(Const.INFLUX_MEASUREMENT_BASE_RAW + targetType)
                    // 时间一律采用毫秒级
                    .time(timestamp, TimeUnit.MILLISECONDS)
                    .tag(Const.PID, projectId)
                    .tag(Const.SC, serialCode);
            for (int i = 0; i < values.length; i++) {
                // 存储所有指标
                String fieldName = Const.INFLUX_MEASUREMENT_FIELD_BASE + (i + 1);
                // 数值一律采用 double 类型
                double fieldValue = values[i];
                // 添加 field
                pointBuilder.addField(fieldName, fieldValue);
            }
            return pointBuilder.build();
        } catch (Exception e) {
            log.error("get point failed");
            e.printStackTrace();
        }
        return null;
    }

    /**
     * 关闭连接对象
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (influxDB != null) {
            influxDB.close();
        }
    }

}
